package operator;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import java.sql.*;
import java.util.Collections;
import java.util.concurrent.*;
import java.util.function.Supplier;

public class AsyncDataBaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
    // 线程池
    private ExecutorService executorService;

    // 连接池
    private DruidDataSource druidDataSource;

    @Override
    public void asyncInvoke(String key, ResultFuture<Tuple2<String, String>> resultFuture) {
        Future<String> result = executorService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                // 从连接池中获取连接
                DruidPooledConnection connection = druidDataSource.getConnection();
                // 预编译SQL
                String sql = "select country from people where name = ?";
                PreparedStatement preparedStatement = connection.prepareStatement(sql);
                // 设置参数
                preparedStatement.setString(1, key);
                // 执行SQL并获取结果
                ResultSet resultSet = preparedStatement.executeQuery();

                String country = "";
                try {
                    // 封装结果
                    while (resultSet.next()) {
                        country = resultSet.getString("country");
                    }
                } finally {
                    resultSet.close();
                    preparedStatement.close();
                    connection.close();
                }
                return country;
            }
        });

        // 获取异步结果并输出
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    return null;
                }
            }
        }).thenAccept((String dbResult) -> {
            resultFuture.complete(Collections.singleton(Tuple2.of(key, dbResult)));
        });

    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        druidDataSource = new DruidDataSource();
        druidDataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
        druidDataSource.setUsername("bigdata");
        druidDataSource.setPassword("bigdata123");
        druidDataSource.setUrl("jdbc:mysql://192.168.20.250:3306/bi-test");

        // 创建线程池，用于执行异步操作
        executorService = new ThreadPoolExecutor(5, 15, 1,
                TimeUnit.MINUTES,
                new LinkedBlockingDeque<>(100));
    }

    @Override
    public void close() throws Exception {
        super.close();
        // 关闭连接池
        if (druidDataSource != null){
            druidDataSource.close();
        }

        // 关闭线程池
        if (executorService != null){
            executorService.shutdown();
        }
    }
}
